#include "demo.h"




int epd_client_conn(epd_client_t* const client, const char* server_ip, int port, int timeout)
{
	if(EPD_UNLIKELY(client == NULL) || EPD_UNLIKELY(server_ip == NULL) || EPD_UNLIKELY(port <= 0))
	{
		EPD_ERR("invalid parameters, client=%p, server_ip=%p, port=%d.\n", client, server_ip, port);
		return -EPD_INVALID_PARAM;
	}
	strcpy(client->ip, server_ip);
	client->port = port;
	
	client->sock_fd = socket(AF_INET, SOCK_STREAM, 0);
	if(client->sock_fd < 0)  
	{  
		EPD_ERR("open socket failed, %s:%d !\n", server_ip, port);
		return EPD_FAILED;
	}  
	
	struct sockaddr_in ser_addr;
	memset(&ser_addr, 0, sizeof(ser_addr));
	ser_addr.sin_family	= AF_INET;
	ser_addr.sin_port	= htons(port);
	inet_aton(server_ip, (struct in_addr *)&ser_addr.sin_addr);
	if(connect(client->sock_fd, (struct sockaddr *)&ser_addr, sizeof(ser_addr)) < 0)  
	{  
		EPD_ERR("connect to %s:%d failed, errno %d !\n", server_ip, port, errno);
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	int buf_size = EPD_SIZE_1K * 256;
	if(setsockopt(client->sock_fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size)) != 0)
	{
		EPD_ERR("set send buffer size failed, %s.\n", strerror(errno));
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	if(setsockopt(client->sock_fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)) != 0)
	{
		EPD_ERR("set receive buffer size failed, %s.\n", strerror(errno));
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	struct timeval tv;
	tv.tv_sec = timeout / 1000;
	tv.tv_usec = (timeout % 1000) * 1000;
	if(setsockopt(client->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) != 0)
	{
		EPD_ERR("set send recv error, %d.\n", errno);
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	if(setsockopt(client->sock_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) != 0)
	{
		EPD_ERR("set send timeout error, %d.\n", errno);
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	int flags = fcntl(client->sock_fd, F_GETFL, 0);
	if(EPD_UNLIKELY(flags == -1))
	{
		EPD_ERR("get fcntl error, %d.\n", errno);
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	if(EPD_UNLIKELY(fcntl(client->sock_fd, F_SETFL, flags | O_NONBLOCK) == -1))
	{
		EPD_ERR("set socket nonblock failed.\n, %d.\n", errno);
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	client->epfd = epoll_create(EPD_CLIENT_EPOLL_FD_MAX);
	if(client->epfd < 0)
	{
		EPD_ERR("create epoll failed, %d.\n", errno);
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	if(epd_epoll_fd_add(client->epfd, client->sock_fd, EPD_FALSE) != EPD_OK)
	{
		EPD_ERR("add sock epoll failed, %d.\n", errno);
		close(client->epfd);
		close(client->sock_fd);
		client->sock_fd = -1;
		return EPD_FAILED;
	}
	
	return EPD_OK;
}


int epd_client_disc(epd_client_t* const client, int timeout)
{
	if(EPD_UNLIKELY(client == NULL))
	{
		EPD_ERR("invalid parameters, client=%p.\n", client);
		return -EPD_INVALID_PARAM;
	}
	
	if(client->sock_fd > 0)
	{
		close(client->epfd);
		close(client->sock_fd);
		client->sock_fd = -1;
	}
	return EPD_OK;
}


EPD_LOCAL int epd_clinet_resp_wait(epd_client_t* const client, int timeout)
{
	struct epoll_event events[EPD_CLIENT_EPOLL_EVT_MAX];
	memset(events, 0, sizeof(events));
	
	int event_num = epoll_wait(client->epfd, events, EPD_NUM_OF(events), timeout);
	if(event_num <= 0)
	{
//		EPD_ERR("wait failed, errno %d.\n", event_num);
		return EPD_FAILED;
	}
	
	if(EPD_UNLIKELY(events[0].data.fd != client->sock_fd))
	{
		EPD_ERR("error epoll event fd.\n");
		return EPD_FAILED;
	}
	
	if(EPD_UNLIKELY((events[0].events & EPOLLIN) == 0))
	{
		EPD_ERR("error event direction.\n");
		return EPD_FAILED;
	}
	
	return EPD_OK;
}


int epd_client_req(epd_client_t* const client, epd_req_t* const req, int timeout)
{
	int ret = EPD_OK;
	
	if(EPD_UNLIKELY(client == NULL) || EPD_UNLIKELY(req == NULL))
	{
		EPD_ERR("invalid parameters, client=%p, req=%p.\n", client, req);
		return -EPD_INVALID_PARAM;
	}
	
	if(client->sock_fd < 0)
	{
		EPD_ERR("client has not connect to %s:%d!\n", client->ip, client->port);
		return EPD_FAILED;
	}
	
	ret = send(client->sock_fd, req, sizeof(*req), MSG_WAITALL);
	if(ret != sizeof(*req))
	{
		EPD_ERR("send failed, errno %d.\n", ret);
		return EPD_FAILED;
	}
	
	ret = epd_clinet_resp_wait(client, timeout);
	if(ret != EPD_OK)
	{
//		EPD_ERR("wait response failed, %d.\n", ret);
		return EPD_FAILED;
	}
	
	ret = recv(client->sock_fd, req, sizeof(*req), MSG_DONTWAIT | MSG_WAITALL);
	if(ret != sizeof(*req))
	{
		EPD_ERR("recv response failed, errno %d.\n", ret);
		return EPD_FAILED;
	}
	
	return EPD_OK;
}


int epd_client_req_test(epd_client_t* const client, uint64_t idx)
{
	const int val = rand();
	struct timeval tm;
	epd_req_t req;
	
	gettimeofday(&tm, NULL);
	req.idx = idx++;
	
	snprintf(req.name, sizeof(req.name), "client %lu", req.idx);
	snprintf(req.info, sizeof(req.info), "[%ld.%09ld] test %s", tm.tv_sec, tm.tv_usec, asctime(localtime(&tm.tv_sec)));
	
	req.val = val;
	
	if(epd_client_req(client, &req, 50) != EPD_OK)
	{
//		EPD_ERR("[%06lu] request failed %s - %s", req.idx, req.name, req.info);
		return EPD_FAILED;
	}
	
//	EPD_OUT("[%06lu] [%012lu->%012lu %s] %s - %s", req.idx, val, req.val, (val + 1) == req.val ? "OK" : "FAILED", req.name, req.info);
	return EPD_OK;
}

EPD_LOCAL uint64_t epd_conn_num = 0;
EPD_LOCAL uint64_t epd_run_time = 0;
EPD_LOCAL struct timeval epd_last_tm;

EPD_LOCAL void epd_timer_proc(int signum)
{
	EPD_LOCAL last_conn_num = 0;
	struct timeval tm;
	int interval = 0;
	
	epd_run_time ++;
	gettimeofday(&tm, NULL);
	
	EPD_OUT("Current Run Time : %05lu, ", epd_run_time);
	EPD_OUT("Connect Number   : %08lu, ", epd_conn_num);
	
	interval = tm.tv_sec * 1000000 + tm.tv_usec - epd_last_tm.tv_sec * 1000000 - epd_last_tm.tv_usec;
	if(epd_conn_num >= last_conn_num)
	{
		EPD_OUT("Connect Speed    : %05ld.\n", (epd_conn_num - last_conn_num) * 1000000 / interval);
	}
	else
	{
		EPD_OUT("Connect Speed    : -%05ld.\n", (last_conn_num - epd_conn_num) * 1000000 / interval);
	}
	
	epd_last_tm.tv_sec 	= tm.tv_sec;
	epd_last_tm.tv_usec	= tm.tv_usec;
	last_conn_num		= epd_conn_num;
}


EPD_LOCAL void epd_timer_init(void)
{
	struct itimerval value, ovalue;
	
	signal(SIGALRM, epd_timer_proc);
	
	value.it_value.tv_sec = 1;
	value.it_value.tv_usec = 0;
	value.it_interval.tv_sec = 1;
	value.it_interval.tv_usec = 0;
	setitimer(ITIMER_REAL, &value, &ovalue);
	
	gettimeofday(&epd_last_tm, NULL);
	EPD_OUT("Start - PID %d, [%ld.%09ld] %s", getpid(), epd_last_tm.tv_sec, epd_last_tm.tv_usec, 
			asctime(localtime(&epd_last_tm.tv_sec)));
}


EPD_LOCAL int is_break = EPD_FALSE;

EPD_LOCAL void epd_client_intc(int signum)
{
	is_break = EPD_TRUE;
}


EPD_LOCAL void epd_client_wait(int speed, int interval)
{
	if(interval < 1000000/speed)
	{
		usleep(1000000/speed>>2);
	}
	else
	{
		usleep(1);
	}
}


int main(int argc, const char* argv[])
{
	static uint64_t idx =0;
	epd_client_t* clients;
	const char* server_ip	= "127.0.0.1";
	int server_port	= atoi("9999");
	int client_num	= 50000;
	int conn_speed	= 3000;
	int i;
	
	if(argc < 4)
	{
		EPD_OUT("client <server ip> <server port> <client number> <connect speed>.\n");
		return -EPD_INVALID_PARAM;
	}
	server_ip	= argv[1];
	server_port	= atoi(argv[2]);
	client_num	= atoi(argv[3]);
	conn_speed	= atoi(argv[4]);
	
	if((server_port <= 1000) || (client_num <= 0) || (conn_speed <=0))
	{
		EPD_ERR("invalid parameters, server_port %d, client_num %d, conn_speed %d.\n", server_port, client_num, conn_speed);
		return -EPD_INVALID_PARAM;
	}
	
	if(signal(SIGINT, epd_client_intc) == SIG_ERR)
	{
		EPD_ERR("install ctrl+c signal failed.\n");
		return EPD_FAILED;
	}
	
	epd_timer_init();
	
	clients = malloc(sizeof(*clients) * client_num);
	if(clients == NULL)
	{
		EPD_ERR("no enough memory.\n");
		return -EPD_NO_MEMORY;
	}
	memset(clients, 0, sizeof(*clients) * client_num);
	
	srand((int)time(0));
	
	for(i=0; i<client_num; i++)
	{
		uint64_t interval;
		struct timeval tm;
		
		gettimeofday(&tm, NULL);
		interval = tm.tv_sec * 1000000 + tm.tv_usec;
		
		if(epd_client_conn(&clients[i], server_ip, server_port, 300) != EPD_OK || is_break)
		{  
			EPD_ERR("client connect failed, current connect number is %d.\n", epd_conn_num);
			free(clients);
			return EPD_FAILED;
		}
		epd_conn_num ++;
		
		gettimeofday(&tm, NULL);
		interval = tm.tv_sec * 1000000 + tm.tv_usec - interval;
		
		epd_client_wait(conn_speed, interval);
	}
	EPD_INF("connect to %s:%d ok, run time %d.\n", server_ip, server_port, epd_run_time);
	
	while(!is_break)
	{
		for(i=0; i<client_num && !is_break; i++)
		{
			epd_client_req_test(&clients[i], idx);
			usleep(100);
		}
		sleep(1);
	}
	
	EPD_INF("disconnect from %s:%d ok.\n", server_ip, server_port);
	for(i=0; i<client_num; i++)
	{
		uint64_t interval;
		struct timeval tm;
		
		gettimeofday(&tm, NULL);
		interval = tm.tv_sec * 1000000 + tm.tv_usec;
		
		epd_client_disc(&clients[i], 300);
		epd_conn_num --;
		
		gettimeofday(&tm, NULL);
		interval = tm.tv_sec * 1000000 + tm.tv_usec - interval;
		
		epd_client_wait(conn_speed, interval);
	}
	free(clients);
	return 0;
}



